feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)#732
feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)#732
Conversation
…(Phase 3.D PR 5b-2)
Stage 2 of PR 5b: wire SendMessage / ReceiveMessage / DeleteMessage
/ ChangeMessageVisibility through the per-key dispatch helpers
landed in PR 5b-1. The section 11 PR 2 dormancy gate still rejects
PartitionCount > 1 at CreateQueue, so production queues remain on
the legacy keyspace and observable behaviour is byte-identical.
PR 5b-3 lifts the gate atomically with the capability check.
Wiring
- SendMessage (sendMessage, sendBatchStandardOnce): partition is
a compile-time 0 (Standard queues reject PartitionCount > 1
via the cross-attribute validator). Dispatch helpers route to
legacy on the steady-state path.
- sendFifoMessage: hashes MessageGroupId once via partitionFor
and threads the result through every key construction (data,
vis, byage, dedup, group-lock).
- ReceiveMessage: scanAndDeliverOnce wraps the scan loop with a
per-partition fanout over effectivePartitionCount(meta)
iterations. The shared wall-clock + per-call max budget caps
apply across the whole call, not per-partition. sqsMsgCandidate
carries the partition field stamped at scan time so downstream
rotate / delete / expire helpers route to the right partition.
- DeleteMessage / ChangeMessageVisibility: handle.Partition from
the v2 receipt handle drives every key construction.
Receipt-handle dispatch
encodeReceiptHandleDispatch(meta, partition, ...) is the single
point that picks v1 vs v2. ReceiveMessage on a partitioned queue
records cand.partition into the handle so DeleteMessage routes
back to the same partition.
decodeClientReceiptHandle is now version-agnostic; the queue-
aware version check moved to validateReceiptHandleVersion which
runs in loadMessageForDelete / loadAndVerifyMessage once meta is
in scope. v1 on a partitioned queue and v2 on a non-partitioned
queue both surface as ReceiptHandleIsInvalid -- preserving the
dormancy promise (no v2 wire-format probability from the public
API) under the new contract. The section 11 PR 2 dormancy gate
still in force in this PR keeps every queue non-partitioned, so
v2 handles still surface as ReceiptHandleIsInvalid downstream,
the exact PR 5a observable behaviour.
Reaper
Reaper iterates legacy byAge keys only -- partition-iterating
enumeration ships in the later partition reaper PR. buildReapOps
/ reapOneRecord pass nil meta + partition 0 through dispatch
helpers so the keys are byte-identical to the pre-PR-5b layout.
Tests (10 added)
Unit tests (sqs_keys_dispatch_v2_test.go):
- TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount:
pins the version dispatch decision across nil/0/1/4 metas.
- TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1:
protects the byte-identical guarantee on legacy queues.
- TestEncodeReceiptHandleDispatch_PerQueueUsesV2: pins that
perQueue + PartitionCount=4 still produces v2 handles
(keyspace is partitioned even when partitionFor collapses
every group to partition 0).
- TestValidateReceiptHandleVersion_QueueAwareRules: 10
sub-cases covering the (meta.PartitionCount x handle.Version)
matrix, including perQueue + PartitionCount=4.
- TestValidateReceiptHandleVersion_NilHandle: defensive nil.
- TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned:
named regression for the dormancy guarantee under the new
contract.
- TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy:
byte-identical to legacy sqsMsgVisScanBounds on legacy meta.
- TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix:
different partitions yield disjoint scan ranges.
- TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace:
pins the PR 731 round 2 forward-note invariant -- perQueue
+ PartitionCount=4 keeps the partitioned vis prefix at
partition 0; collapsing to legacy would silently strand
send writes.
Integration tests (sqs_partitioned_dispatch_test.go) install a
partitioned meta directly on a queue created via the public API,
short-circuiting the dormancy gate without disabling it for
production CreateQueue:
- TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip:
end-to-end smoke test (send 6 groups, receive surfaces all
via fanout, every handle is v2, delete via v2 handle, queue
is empty afterwards, legacy keyspace stays empty).
- TestSQSServer_PartitionedFIFO_RejectsV1Handle: forged v1
handle on a partitioned queue surfaces as ReceiptHandleIsInvalid
via DeleteMessage and ChangeMessageVisibility.
- TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero:
perQueue + PartitionCount=4 receive surfaces every message
in one fanout pass; every v2 handle records Partition=0.
Updated TestDecodeClientReceiptHandle_RejectsV2 to
TestDecodeClientReceiptHandle_AcceptsV2 to reflect the contract
shift (rejection moved from API boundary to meta-aware
validateReceiptHandleVersion).
Self-review (per CLAUDE.md)
1. Data loss -- Dispatch helpers byte-identical on legacy queues
(PartitionCount<=1 routes to legacy constructors). DLQ FIFO
computes dlqPartition via partitionFor(dlqMeta, srcRec.MessageGroupId)
so cross-queue redrive lands in the right partition. Receive
fanout scans every partition. No issue.
2. Concurrency -- Receive fanout iterates partitions sequentially
under one shared wall-clock + max budget, so the original
per-call SLA is preserved. meta is loaded once and passed by
pointer through helpers; no concurrent meta refresh in a
single call. validateReceiptHandleVersion runs after the gen
check so a SetQueueAttributes race cannot flip the answer
(PartitionCount is immutable). go test -race clean.
3. Performance -- Hot path adds 1 nil-check + 1 PartitionCount
compare per dispatch. Receive fanout iterates effectivePartitionCount(meta)
times: 1 on legacy / perQueue, N on perMessageGroupId, with
N partitions splitting roughly 1/N of the messages each.
4. Data consistency -- Receipt handle's recorded partition matches
the storage partition (commitReceiveRotation uses cand.partition
for both newVisKey and the handle). Cross-version handles
rejected as ReceiptHandleIsInvalid -- no leak between keyspaces.
perQueue + PartitionCount=4 invariant pinned by both unit and
integration tests.
5. Test coverage -- 10 new tests across the contract surface.
Existing legacy-path SQS tests pass unchanged.
go test -race ./adapter/... and golangci-lint clean.
📝 WalkthroughWalkthroughThis PR introduces partition-aware dispatch mechanisms for SQS message keys and receipt-handle encoding. Core dispatch helpers (sqsMsgDataKeyDispatch, sqsMsgVisKeyDispatch, etc.) are wired into FIFO send/receive, standard batch operations, deletion, visibility updates, redrive, and reaper flows to route and encode messages by partition. Partition-stamped message candidates enable downstream operations to use correct per-partition keys. V2 receipt-handle format and queue-aware version validation are added, with legacy non-partitioned queues (partition count ≤1) remaining on v1. ChangesPartition-Aware SQS Dispatch
Sequence DiagramsequenceDiagram
participant Client
participant SQS Server
participant Partitioner
participant Visibility Store
participant Data Store
rect rgba(100, 150, 200, 0.5)
Note over Client,Data Store: Partitioned FIFO Send
Client->>SQS Server: SendMessage(MessageGroupId="group-A")
SQS Server->>Partitioner: partitionFor(meta, "group-A")
Partitioner-->>SQS Server: partition = 2
SQS Server->>Data Store: Write(sqsMsgDataKeyDispatch(..., partition=2, ...))
SQS Server->>Visibility Store: Write(sqsMsgVisKeyDispatch(..., partition=2, ...))
SQS Server->>SQS Server: encodeReceiptHandleDispatch(..., partition=2, ...) → v2 handle
SQS Server-->>Client: SendMessageResponse(ReceiptHandle=v2)
end
rect rgba(200, 100, 150, 0.5)
Note over Client,Data Store: Partitioned Fanout Receive
Client->>SQS Server: ReceiveMessage(MaxMessages=10)
SQS Server->>SQS Server: readTS → startPartitionOffset → partition[0,1,2,3]
loop For Each Partition
SQS Server->>Visibility Store: Scan(sqsMsgVisScanBoundsDispatch(..., partition, ...))
Visibility Store-->>SQS Server: Candidates{partition, visKey, messageID}
end
SQS Server->>Data Store: Load(sqsMsgDataKeyDispatch(..., cand.partition, ...))
SQS Server->>SQS Server: encodeReceiptHandleDispatch(..., cand.partition, ...) → v2 handle
SQS Server-->>Client: ReceiveMessageResponse(Messages=[...], ReceiptHandles=[v2])
end
rect rgba(150, 200, 100, 0.5)
Note over Client,Data Store: Partition-Aware Delete
Client->>SQS Server: DeleteMessage(ReceiptHandle=v2)
SQS Server->>SQS Server: validateReceiptHandleVersion(meta, v2_handle)
SQS Server->>Data Store: Delete(sqsMsgDataKeyDispatch(..., v2_handle.partition, ...))
SQS Server->>Visibility Store: Delete(sqsMsgVisKeyDispatch(..., v2_handle.partition, ...))
SQS Server-->>Client: DeleteMessageResponse()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing [Slack Agent](https://www.coderabbit.ai/agent): Turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. 👉 Get your free trial and get 200 agent minutes per Slack user (a $50 value). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
|
@claude review |
|
Claude finished @bootjp's task in 8m 28s —— View job Code Review: feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)
Overall assessmentThis is a well-structured, carefully staged PR. The core abstraction — a single dispatch layer ( Medium:
|
| Path | Partition source | Consistent with send? |
|---|---|---|
sendFifoMessage key construction |
partitionFor(meta, in.MessageGroupId) |
✅ |
commitReceiveRotation vis-key + handle |
cand.partition (stamped at scan) |
✅ |
buildDeleteOps vis/byage/group-lock keys |
handle.Partition (v2 handle) |
✅ |
changeVisibilityWithRetry old/new vis key |
handle.Partition (v2 handle) |
✅ |
buildRedriveOps DLQ keys |
partitionFor(dlqMeta, dlqRec.MessageGroupId) |
✅ |
Reaper (buildReapOps) |
nil meta + partition 0 → legacy only |
✅ (intentional, PR 6 will extend) |
validateReceiptHandleVersion |
version-only check |
Snapshot consistency: readTS is computed once in scanAndDeliverOnce and threaded identically through all partition fanout calls — consistent multi-partition snapshot. ✅
Group-lock partition coherence: partitionFor is deterministic (FNV-1a, power-of-two mask); a given MessageGroupId maps to exactly one partition for the lifetime of a queue generation. The group lock is always written and read at that same partition. ✅
PartitionCount immutability: validatePartitionImmutability is enforced on SetQueueAttributes, making the meta-dispatch pattern race-safe without re-loading meta inside OCC transactions. ✅
Dormancy preservation: Every code path that could emit a v2 handle is gated on meta.PartitionCount > 1, which CreateQueue continues to reject via validatePartitionDormancyGate until PR 5b-3. ✅
Test coverage
10 new tests are well-targeted. Particular highlights:
TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspacedirectly pins the forward-note invariant from PR 731 round 2 (perQueue + partitioned keyspace must scan partitioned prefix, not legacy).TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTripsanity-checks the legacy keyspace is empty after all operations, which is the cleanest possible regression gate for future dispatch regressions.TestDecodeClientReceiptHandle_AcceptsV2correctly documents the contract shift.
Missing: a test that a v2 handle with Partition >= meta.PartitionCount is rejected (tracking the medium finding above).
Verdict: Ready to merge after the partition bounds check is added to validateReceiptHandleVersion and covered by a test. The rest of the implementation is sound.
There was a problem hiding this comment.
Code Review
This pull request implements the data plane wiring for partitioned FIFO queues. It introduces dispatch helpers to route key construction, receipt handle encoding, and version validation based on the queue's partition count. Updates were made to message sending, receiving, deletion, and redriving logic to support these partitions. A review comment pointed out an improvement opportunity in the integration tests to use a more robust helper for calculating prefix scan bounds instead of manually appending bytes.
| // Cap the prefix scan at the generation byte so we do not | ||
| // drag in unrelated queues. | ||
| end := append([]byte(nil), legacyDataPrefix...) | ||
| end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) |
There was a problem hiding this comment.
The logic for creating the end key for the prefix scan is incorrect. Appending 0xFF bytes does not correctly create an exclusive upper bound for all cases, especially when the prefix itself contains 0xFF bytes, which could cause the scan to read into adjacent key ranges (e.g., a different queue generation). This could lead to test flakiness or incorrect test behavior.
A more robust way to create an end key for a prefix is to find the next lexicographical key after the prefix. The codebase already has a helper for this, prefixScanEnd. You should use it here for correctness and consistency.
| // Cap the prefix scan at the generation byte so we do not | |
| // drag in unrelated queues. | |
| end := append([]byte(nil), legacyDataPrefix...) | |
| end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) | |
| // The end key for a prefix scan is the smallest key that does not | |
| // have the prefix. This correctly bounds the scan to only keys | |
| // within the given queue and generation. | |
| end := prefixScanEnd(legacyDataPrefix) |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 443ceb5f9c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for partition := uint32(0); partition < partitions; partition++ { | ||
| if len(delivered) >= opts.Max { | ||
| break |
There was a problem hiding this comment.
Avoid fixed partition-0 priority in receive fanout
When PartitionCount > 1, this loop always starts at partition 0 and exits as soon as opts.Max is filled, so repeated ReceiveMessage calls can permanently starve higher-index partitions whenever partition 0 has a sustained backlog. This only appears on partitioned queues, but once enabled it can make some message groups effectively undeliverable under load because later partitions are never scanned.
Useful? React with 👍 / 👎.
| return errors.New("receipt handle version mismatch for non-partitioned queue") | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Reject out-of-range v2 receipt-handle partitions
The queue-aware version check now accepts any v2 handle on partitioned queues but never validates handle.Partition < meta.PartitionCount. A forged v2 handle with an out-of-range partition is then used to build partitioned keys in delete/change-visibility paths; the partition resolver fails closed for unknown partitions, which bubbles up as a generic internal error instead of ReceiptHandleIsInvalid. This is reachable once partitioned queues are enabled.
Useful? React with 👍 / 👎.
…und 1) Codex P1: with the fanout always starting at partition 0, a sustained backlog on partition 0 would fill opts.Max before the loop reached partition 1 — messages in higher-index partitions were never observed under load. Rotate the starting partition by readTS so consecutive receives spread across every partition. Helper startPartitionOffset folds the upper and lower 32 bits of the HLC-derived readTS and mask-ANDs by partitions-1 (PartitionCount is power-of-two by validator invariant). The byte-slice fold preserves entropy from both HLC halves while keeping the conversion in uint32 space — no //nolint needed for gosec G115. FIFO ordering is unaffected: partitionFor is deterministic, so a MessageGroupId always lands in one partition and cross-partition iteration order does not reorder messages within any group. Caller audit: scanAndDeliverOnce / scanAndDeliverPartition signatures and error semantics are unchanged; only the internal iteration order rotates. New helper has only the one in-package caller.
|
@claude review Round 1 addressing Codex P1: rotate the receive-fanout starting partition by readTS so a sustained backlog on a single partition cannot starve higher-index partitions. Changes:
FIFO ordering: unaffected. Caller audit: P2/medium findings (out-of-range v2 partition validation, prefix-scan helper in test) are intentionally deferred — below the loop's P0/P1/high/critical/major threshold. |
|
Claude finished @bootjp's task in 2m 22s —— View job Code Review: Round 1 — Partition-starvation fix (
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 52c449f5b3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| startOffset := startPartitionOffset(partitions, readTS) | ||
| for i := uint32(0); i < partitions; i++ { |
There was a problem hiding this comment.
Rotate receive start independently from commit-ts stride
Using readTS as the only rotation source can still starve partitions under sustained load, because readTS comes from the last committed timestamp and often advances by a fixed stride (roughly the number of successful per-message rotations in each receive call). With power-of-two partitions, masking this structured value can lock start offsets to a subset (e.g. 4 partitions + MaxNumberOfMessages=10 can alternate between two starts), so if those early partitions stay backlogged, later partitions are never scanned.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
adapter/sqs_partitioned_dispatch_test.go (1)
288-317: ⚡ Quick winPin the single-pass behavior instead of retrying receives.
This loop lets the test pass even if
effectivePartitionCountnever collapses to 1, because repeated receives can still drain partition 0 eventually. If the contract here is “one receive call must surface every message,” make that the assertion directly.Suggested tightening
collected := make(map[string]bool, len(groups)) - for range 4 { - if len(collected) == len(groups) { - break - } - status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ - "QueueUrl": queueURL, - "MaxNumberOfMessages": 10, - "VisibilityTimeout": 60, - }) - require.Equal(t, http.StatusOK, status, "receive: %v", out) - msgs, _ := out["Messages"].([]any) - for _, m := range msgs { - mm, _ := m.(map[string]any) - body, _ := mm["Body"].(string) - collected[body] = true - handle, _ := mm["ReceiptHandle"].(string) - parsed, err := decodeReceiptHandle(handle) - require.NoError(t, err) - require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, - "perQueue + PartitionCount=4 must still produce v2 handles") - require.Equal(t, uint32(0), parsed.Partition, - "perQueue mode pins every group to partition 0, so every handle must record Partition=0") - } - } + status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MaxNumberOfMessages": 10, + "VisibilityTimeout": 60, + }) + require.Equal(t, http.StatusOK, status, "receive: %v", out) + msgs, _ := out["Messages"].([]any) + for _, m := range msgs { + mm, _ := m.(map[string]any) + body, _ := mm["Body"].(string) + collected[body] = true + handle, _ := mm["ReceiptHandle"].(string) + parsed, err := decodeReceiptHandle(handle) + require.NoError(t, err) + require.Equal(t, sqsReceiptHandleVersion2, parsed.Version, + "perQueue + PartitionCount=4 must still produce v2 handles") + require.Equal(t, uint32(0), parsed.Partition, + "perQueue mode pins every group to partition 0, so every handle must record Partition=0") + } require.Len(t, collected, len(groups), "perQueue receive must surface every message in one fanout pass over partition 0")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/sqs_partitioned_dispatch_test.go` around lines 288 - 317, The test currently loops multiple receive calls which masks failures by eventually draining partition 0; instead make a single receive call using callSQS(sqsReceiveMessageTarget, ...) and assert that the response contains exactly len(groups) Messages in that one call, then iterate those messages to validate decodeReceiptHandle parsed.Version equals sqsReceiptHandleVersion2 and parsed.Partition == 0; remove the retry loop and fail the test immediately if the single receive doesn't return all messages so the contract “one receive call must surface every message” is enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@adapter/sqs_keys_dispatch.go`:
- Around line 182-195: In validateReceiptHandleVersion, add a bounds check for
v2 handles so that when meta != nil && meta.PartitionCount > 1 and
handle.Version == sqsReceiptHandleVersion2 you also verify handle.Partition <
meta.PartitionCount; if the partition is out of range return the
ReceiptHandleIsInvalid error (or an appropriate error indicating invalid receipt
handle) instead of allowing it to fall through to downstream routing; update the
function logic around validateReceiptHandleVersion to perform this check before
returning nil.
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 91-106: The test currently only asserts parsed.Partition is within
a range which can hide misrouting; instead, after populating sent (the map from
group -> MessageId) reverse that map to look up group by MessageId and assert
that for each received handle the decoded parsed.Partition equals
partitionFor(meta, group). Concretely: after sending messages with callSQS
(using MessageGroupId) and collecting sent[g]=msgID, when you decode handles
check the MessageId from the handle, find the original group via the reversed
sent map, then replace the loose parsed.Partition < 4 assertion with
require.Equal(t, partitionFor(meta, group), parsed.Partition) so each message is
asserted to land on its expected partition.
- Around line 160-191: The current post-delete check re-calls
callSQS(sqsReceiveMessageTarget) immediately and can miss undeleted but
invisible messages (earlier receives used VisibilityTimeout: 60), so a
regression that leaves records invisible would still pass; instead of relying on
immediate receive, either wait > VisibilityTimeout before re-receiving or
(preferred) directly assert the partitioned keyspace is empty by scanning the
partitioned data and visibility prefixes with node.sqsServer.store.ScanAt using
the queue generation from node.sqsServer.loadQueueMetaAt (you already use
sqsMsgDataKey for the legacy prefix—compute the analogous partitioned
data/visibility prefixes for this queue generation and assert ScanAt returns
zero entries), keeping the existing legacy-prefix check for completeness.
---
Nitpick comments:
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 288-317: The test currently loops multiple receive calls which
masks failures by eventually draining partition 0; instead make a single receive
call using callSQS(sqsReceiveMessageTarget, ...) and assert that the response
contains exactly len(groups) Messages in that one call, then iterate those
messages to validate decodeReceiptHandle parsed.Version equals
sqsReceiptHandleVersion2 and parsed.Partition == 0; remove the retry loop and
fail the test immediately if the single receive doesn't return all messages so
the contract “one receive call must surface every message” is enforced.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 467f55af-49a9-40c4-a7bc-c9b223133d6e
📒 Files selected for processing (9)
adapter/sqs_fifo.goadapter/sqs_keys_dispatch.goadapter/sqs_keys_dispatch_v2_test.goadapter/sqs_messages.goadapter/sqs_messages_batch.goadapter/sqs_partitioned_dispatch_test.goadapter/sqs_reaper.goadapter/sqs_receipt_handle_v2_test.goadapter/sqs_redrive.go
| func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error { | ||
| if handle == nil { | ||
| return errors.New("receipt handle is nil") | ||
| } | ||
| if meta != nil && meta.PartitionCount > 1 { | ||
| if handle.Version != sqsReceiptHandleVersion2 { | ||
| return errors.New("receipt handle version mismatch for partitioned queue") | ||
| } | ||
| return nil | ||
| } | ||
| if handle.Version != sqsReceiptHandleVersion1 { | ||
| return errors.New("receipt handle version mismatch for non-partitioned queue") | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Reject out-of-range v2 partitions in the meta-aware validator.
handle.Partition is now client-controlled once decodeClientReceiptHandle accepts v2. This helper only checks the version, so Partition >= meta.PartitionCount falls through to sqsMsg*KeyDispatch and depends on downstream routing failure semantics instead of returning ReceiptHandleIsInvalid. This is the right choke point to bounds-check before PR 5b-3 lifts the gate.
Suggested fix
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
+ if handle.Partition >= meta.PartitionCount {
+ return errors.New("receipt handle partition out of range for queue")
+ }
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error { | |
| if handle == nil { | |
| return errors.New("receipt handle is nil") | |
| } | |
| if meta != nil && meta.PartitionCount > 1 { | |
| if handle.Version != sqsReceiptHandleVersion2 { | |
| return errors.New("receipt handle version mismatch for partitioned queue") | |
| } | |
| return nil | |
| } | |
| if handle.Version != sqsReceiptHandleVersion1 { | |
| return errors.New("receipt handle version mismatch for non-partitioned queue") | |
| } | |
| return nil | |
| func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error { | |
| if handle == nil { | |
| return errors.New("receipt handle is nil") | |
| } | |
| if meta != nil && meta.PartitionCount > 1 { | |
| if handle.Version != sqsReceiptHandleVersion2 { | |
| return errors.New("receipt handle version mismatch for partitioned queue") | |
| } | |
| if handle.Partition >= meta.PartitionCount { | |
| return errors.New("receipt handle partition out of range for queue") | |
| } | |
| return nil | |
| } | |
| if handle.Version != sqsReceiptHandleVersion1 { | |
| return errors.New("receipt handle version mismatch for non-partitioned queue") | |
| } | |
| return nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@adapter/sqs_keys_dispatch.go` around lines 182 - 195, In
validateReceiptHandleVersion, add a bounds check for v2 handles so that when
meta != nil && meta.PartitionCount > 1 and handle.Version ==
sqsReceiptHandleVersion2 you also verify handle.Partition < meta.PartitionCount;
if the partition is out of range return the ReceiptHandleIsInvalid error (or an
appropriate error indicating invalid receipt handle) instead of allowing it to
fall through to downstream routing; update the function logic around
validateReceiptHandleVersion to perform this check before returning nil.
| groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"} | ||
| sent := make(map[string]string, len(groups)) // group -> messageID | ||
| for i, g := range groups { | ||
| body := "body-" + g | ||
| dedup := "dedup-" + g | ||
| status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ | ||
| "QueueUrl": queueURL, | ||
| "MessageBody": body, | ||
| "MessageGroupId": g, | ||
| "MessageDeduplicationId": dedup, | ||
| }) | ||
| require.Equal(t, http.StatusOK, status, | ||
| "send #%d (group=%s): %v", i, g, out) | ||
| msgID, _ := out["MessageId"].(string) | ||
| require.NotEmpty(t, msgID, "send #%d: empty MessageId", i) | ||
| sent[g] = msgID |
There was a problem hiding this comment.
Assert the expected partition, not just that it is in range.
parsed.Partition < 4 still passes if every MessageGroupId is accidentally routed to partition 0, so this round-trip would miss the main dispatch regression it is meant to catch. Since the test already records group -> MessageId, reverse that map and compare each decoded handle partition to partitionFor(meta, group).
Suggested tightening
groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"}
sent := make(map[string]string, len(groups)) // group -> messageID
+ byMessageID := make(map[string]string, len(groups)) // messageID -> group
for i, g := range groups {
body := "body-" + g
dedup := "dedup-" + g
status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{
@@
require.Equal(t, http.StatusOK, status,
"send #%d (group=%s): %v", i, g, out)
msgID, _ := out["MessageId"].(string)
require.NotEmpty(t, msgID, "send #%d: empty MessageId", i)
sent[g] = msgID
+ byMessageID[msgID] = g
}
+
+ ctx := context.Background()
+ readTS := node.sqsServer.nextTxnReadTS(ctx)
+ meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS)
+ require.NoError(t, err)
+ require.True(t, exists)
@@
require.Less(t, parsed.Partition, uint32(4),
"v2 handle partition out of range")
+ require.Equal(t,
+ partitionFor(meta, byMessageID[id]),
+ parsed.Partition,
+ "message %s routed to wrong partition", id,
+ )
collected[id] = handle
}
}Also applies to: 131-139
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@adapter/sqs_partitioned_dispatch_test.go` around lines 91 - 106, The test
currently only asserts parsed.Partition is within a range which can hide
misrouting; instead, after populating sent (the map from group -> MessageId)
reverse that map to look up group by MessageId and assert that for each received
handle the decoded parsed.Partition equals partitionFor(meta, group).
Concretely: after sending messages with callSQS (using MessageGroupId) and
collecting sent[g]=msgID, when you decode handles check the MessageId from the
handle, find the original group via the reversed sent map, then replace the
loose parsed.Partition < 4 assertion with require.Equal(t, partitionFor(meta,
group), parsed.Partition) so each message is asserted to land on its expected
partition.
| // Queue must now be empty even after the visibility timeout | ||
| // would have re-exposed it. | ||
| status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ | ||
| "QueueUrl": queueURL, | ||
| "MaxNumberOfMessages": 10, | ||
| }) | ||
| require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out) | ||
| if msgs, _ := out["Messages"].([]any); len(msgs) > 0 { | ||
| t.Fatalf("expected empty queue after delete; got %d messages", len(msgs)) | ||
| } | ||
|
|
||
| // Sanity: the legacy keyspace must be empty (every send on | ||
| // this queue went to the partitioned keyspace, never the | ||
| // legacy one). We probe the legacy data prefix at this | ||
| // queue's generation: it must yield zero entries. | ||
| ctx := context.Background() | ||
| readTS := node.sqsServer.nextTxnReadTS(ctx) | ||
| meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) | ||
| require.NoError(t, err) | ||
| require.True(t, exists) | ||
| legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "") | ||
| // Cap the prefix scan at the generation byte so we do not | ||
| // drag in unrelated queues. | ||
| end := append([]byte(nil), legacyDataPrefix...) | ||
| end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) | ||
| page, err := node.sqsServer.store.ScanAt(ctx, legacyDataPrefix, end, 32, readTS) | ||
| require.NoError(t, err) | ||
| for _, kvp := range page { | ||
| require.False(t, strings.HasPrefix(string(kvp.Key), string(legacyDataPrefix)), | ||
| "legacy data key found on a partitioned queue (key=%q) — wiring leaked into the wrong keyspace", | ||
| string(kvp.Key)) | ||
| } |
There was a problem hiding this comment.
The post-delete emptiness check can still pass with undeleted invisible messages.
All earlier receives use VisibilityTimeout: 60, but this assertion re-receives immediately. If DeleteMessage regresses into “leave the record invisible but not removed,” this block still passes. The legacy-prefix probe below does not close that gap because it never inspects the partitioned keyspace.
Stronger options
- // Queue must now be empty even after the visibility timeout
- // would have re-exposed it.
+ // Queue must now be empty. To avoid a false pass from in-flight
+ // visibility, either wait past a short timeout or inspect the
+ // partitioned keyspace directly.- "VisibilityTimeout": 60,
+ "VisibilityTimeout": 1,+ time.Sleep(1100 * time.Millisecond)
status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
"QueueUrl": queueURL,
"MaxNumberOfMessages": 10,
})Or, better for determinism, scan the partitioned data/visibility prefixes and assert they are empty after the deletes.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Queue must now be empty even after the visibility timeout | |
| // would have re-exposed it. | |
| status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ | |
| "QueueUrl": queueURL, | |
| "MaxNumberOfMessages": 10, | |
| }) | |
| require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out) | |
| if msgs, _ := out["Messages"].([]any); len(msgs) > 0 { | |
| t.Fatalf("expected empty queue after delete; got %d messages", len(msgs)) | |
| } | |
| // Sanity: the legacy keyspace must be empty (every send on | |
| // this queue went to the partitioned keyspace, never the | |
| // legacy one). We probe the legacy data prefix at this | |
| // queue's generation: it must yield zero entries. | |
| ctx := context.Background() | |
| readTS := node.sqsServer.nextTxnReadTS(ctx) | |
| meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) | |
| require.NoError(t, err) | |
| require.True(t, exists) | |
| legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "") | |
| // Cap the prefix scan at the generation byte so we do not | |
| // drag in unrelated queues. | |
| end := append([]byte(nil), legacyDataPrefix...) | |
| end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) | |
| page, err := node.sqsServer.store.ScanAt(ctx, legacyDataPrefix, end, 32, readTS) | |
| require.NoError(t, err) | |
| for _, kvp := range page { | |
| require.False(t, strings.HasPrefix(string(kvp.Key), string(legacyDataPrefix)), | |
| "legacy data key found on a partitioned queue (key=%q) — wiring leaked into the wrong keyspace", | |
| string(kvp.Key)) | |
| } | |
| // Queue must now be empty. To avoid a false pass from in-flight | |
| // visibility, either wait past a short timeout or inspect the | |
| // partitioned keyspace directly. | |
| status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{ | |
| "QueueUrl": queueURL, | |
| "MaxNumberOfMessages": 10, | |
| }) | |
| require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out) | |
| if msgs, _ := out["Messages"].([]any); len(msgs) > 0 { | |
| t.Fatalf("expected empty queue after delete; got %d messages", len(msgs)) | |
| } | |
| // Sanity: the legacy keyspace must be empty (every send on | |
| // this queue went to the partitioned keyspace, never the | |
| // legacy one). We probe the legacy data prefix at this | |
| // queue's generation: it must yield zero entries. | |
| ctx := context.Background() | |
| readTS := node.sqsServer.nextTxnReadTS(ctx) | |
| meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) | |
| require.NoError(t, err) | |
| require.True(t, exists) | |
| legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "") | |
| // Cap the prefix scan at the generation byte so we do not | |
| // drag in unrelated queues. | |
| end := append([]byte(nil), legacyDataPrefix...) | |
| end = append(end, 0xFF, 0xFF, 0xFF, 0xFF) | |
| page, err := node.sqsServer.store.ScanAt(ctx, legacyDataPrefix, end, 32, readTS) | |
| require.NoError(t, err) | |
| for _, kvp := range page { | |
| require.False(t, strings.HasPrefix(string(kvp.Key), string(legacyDataPrefix)), | |
| "legacy data key found on a partitioned queue (key=%q) — wiring leaked into the wrong keyspace", | |
| string(kvp.Key)) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@adapter/sqs_partitioned_dispatch_test.go` around lines 160 - 191, The current
post-delete check re-calls callSQS(sqsReceiveMessageTarget) immediately and can
miss undeleted but invisible messages (earlier receives used VisibilityTimeout:
60), so a regression that leaves records invisible would still pass; instead of
relying on immediate receive, either wait > VisibilityTimeout before
re-receiving or (preferred) directly assert the partitioned keyspace is empty by
scanning the partitioned data and visibility prefixes with
node.sqsServer.store.ScanAt using the queue generation from
node.sqsServer.loadQueueMetaAt (you already use sqsMsgDataKey for the legacy
prefix—compute the analogous partitioned data/visibility prefixes for this queue
generation and assert ScanAt returns zero entries), keeping the existing
legacy-prefix check for completeness.
Summary
Stage 2 of Phase 3.D PR 5b: wire
SendMessage/ReceiveMessage/DeleteMessage/ChangeMessageVisibilitythrough the per-key dispatch helpers landed in PR 5b-1 (#731). The §11 PR 2 dormancy gate still rejectsPartitionCount > 1atCreateQueue, so production queues stay on the legacy keyspace and observable behaviour is byte-identical. PR 5b-3 lifts the gate atomically with the capability check.What's wired
Send paths
sendMessage,sendBatchStandardOnce: partition is a compile-time0(Standard queues rejectPartitionCount > 1via the cross-attribute validator). Dispatch helpers route to the legacy keyspace on the steady-state path.sendFifoMessage: hashesMessageGroupIdonce viapartitionForand threads the result through every key construction (data, vis, byage, dedup, group-lock).Receive fanout
scanAndDeliverOncewraps the scan loop with a per-partition fanout overeffectivePartitionCount(meta)iterations. The shared wall-clock + per-callMaxbudget caps apply across the whole call, not per-partition.sqsMsgCandidatecarries a newpartitionfield stamped at scan time, soloadCandidateRecord/expireMessage/commitReceiveRotation/classifyFifoGroupLockall route to the partition the message was originally stored under.Delete + ChangeMessageVisibility
handle.Partitionfrom the v2 receipt handle drives every key construction.loadMessageForDelete/loadAndVerifyMessageinvoke the newvalidateReceiptHandleVersiononce meta is in scope.Receipt-handle dispatch (the contract shift)
encodeReceiptHandleDispatch(meta, partition, ...)is the single point that picks v1 vs v2.decodeClientReceiptHandleis now version-agnostic; the queue-aware version check moved tovalidateReceiptHandleVersion. v1-on-partitioned and v2-on-non-partitioned both surface asReceiptHandleIsInvalid, preserving the dormancy promise (no v2 wire-format probability from the public API) under the new contract. Since the §11 PR 2 dormancy gate keeps every production queue non-partitioned in this PR, every v2 handle still surfaces asReceiptHandleIsInvaliddownstream — exactly the PR 5a observable behaviour.Reaper
The reaper iterates legacy byAge keys only — partition-iterating enumeration ships in the later partition-reaper PR.
buildReapOps/reapOneRecordpassnilmeta +partition 0through dispatch helpers so the keys are byte-identical to the pre-PR-5b layout.What does NOT change yet
CreateQueuestill rejectsPartitionCount > 1withInvalidAttributeValue(the §11 PR 2 dormancy gate, lifted atomically in PR 5b-3).Test plan
10 new tests across the contract surface.
Unit tests (
adapter/sqs_keys_dispatch_v2_test.go):TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount— pins the version dispatch decision acrossnil/0/1/4metas.TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1— protects the byte-identical guarantee on legacy queues.TestEncodeReceiptHandleDispatch_PerQueueUsesV2—perQueue + PartitionCount=4still produces v2 handles (keyspace is partitioned even whenpartitionForcollapses every group to partition 0).TestValidateReceiptHandleVersion_QueueAwareRules— 10 sub-cases covering the (meta.PartitionCount×handle.Version) matrix, includingperQueue + PartitionCount=4.TestValidateReceiptHandleVersion_NilHandle— defensive nil branch.TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned— named regression for the dormancy guarantee under the new contract.TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy— byte-identical to legacysqsMsgVisScanBoundson legacy meta.TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix— different partitions yield disjoint scan ranges.TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace— pins the PR feat(sqs): per-key dispatch helpers for partitioned-FIFO routing (Phase 3.D PR 5b-1) #731 round 2 forward-note invariant:perQueue + PartitionCount=4keeps the partitioned vis prefix at partition 0; collapsing to legacy would silently strand send writes.Integration tests (
adapter/sqs_partitioned_dispatch_test.go) install a partitioned meta directly on a queue created via the public API, short-circuiting the dormancy gate without disabling it for productionCreateQueue:TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip— end-to-end smoke test: send 6 groups, receive surfaces all via fanout, every handle is v2, delete via v2 handle, queue is empty afterwards, legacy keyspace stays empty.TestSQSServer_PartitionedFIFO_RejectsV1Handle— forged v1 handle on a partitioned queue surfaces asReceiptHandleIsInvalidviaDeleteMessageandChangeMessageVisibility.TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero—perQueue + PartitionCount=4receive surfaces every message in one fanout pass; every v2 handle recordsPartition=0.Updated
TestDecodeClientReceiptHandle_RejectsV2→TestDecodeClientReceiptHandle_AcceptsV2to reflect the contract shift (rejection moved from API boundary to meta-awarevalidateReceiptHandleVersion).go test -race ./adapter/...(targeted SQS scope) clean.golangci-lint run ./adapter/...clean.Self-review (per CLAUDE.md)
PartitionCount<=1routes to legacy constructors). DLQ FIFO computesdlqPartitionviapartitionFor(dlqMeta, srcRec.MessageGroupId)so cross-queue redrive lands in the right partition. Receive fanout scans every partition. No issue.metais loaded once and passed by pointer through helpers; no concurrent meta refresh in a single call.validateReceiptHandleVersionruns after the gen check so aSetQueueAttributesrace cannot flip the answer (PartitionCountis immutable).go test -raceclean.PartitionCountcompare per dispatch. Receive fanout iterateseffectivePartitionCount(meta)times: 1 on legacy /perQueue, N onperMessageGroupIdwith N partitions splitting roughly 1/N of the messages each.commitReceiveRotationusescand.partitionfor bothnewVisKeyand the handle). Cross-version handles rejected asReceiptHandleIsInvalid— no leak between keyspaces.perQueue + PartitionCount=4invariant pinned by both unit and integration tests.Caller audit (semantic-change discipline)
decodeClientReceiptHandlesemantics changed from "reject all v2" to "decode any version, defer version validation to meta-aware caller". All 3 production callers audited via grep:parseQueueAndReceipt(sqs_messages.go:1576) →deleteMessageWithRetry→loadMessageForDelete→validateReceiptHandleVersionafter meta load. ✅DeleteMessageBatchentry (sqs_messages_batch.go:475) →deleteMessageWithRetry→ same as above. ✅ChangeMessageVisibilityBatchentry (sqs_messages_batch.go:575) →changeVisibilityWithRetry→loadAndVerifyMessage→validateReceiptHandleVersion. ✅The dormancy guarantee (v2 →
ReceiptHandleIsInvalidon non-partitioned) is preserved across every caller path.Follow-ups
CreateQueuecapability check viaPollSQSHTFIFOCapability(feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a) #721). Removes thevalidatePartitionDormancyGatecall and verifies cluster-wide HT-FIFO support before acceptingPartitionCount > 1.buildReapOps).Summary by CodeRabbit
New Features
Bug Fixes
Tests